import pandas as pd
import pyreadstat
import warnings
import numpy as np
from scipy.optimize import fsolve
import scipy.stats as si
from plotly.subplots import make_subplots
import plotly.graph_objects as go
import plotly.io as pio
warnings.filterwarnings("ignore")
def distance_default(df,col_name,v,sigmav):
df[col_name] = (np.log(df[v]/df["f_lagged"])+(df["r"]-df[sigmav]**2/2))/df[sigmav]
def probability_default(df,col_name,dd_col_name):
df[col_name] = si.norm.cdf(-df[dd_col_name], 0.0, 1.0)
def plot_chart(data,title_name,y1_name,economic_variable = False):
x = data["fyear"].values
fig = make_subplots(specs=[[{"secondary_y": True}]])
end = 2 if economic_variable else 1
for i in range(len(data.columns)-end):
fig.add_trace(go.Scatter(name=data.columns[i+1],x=x, y=data.iloc[:,i+1]),secondary_y=False)
fig.update_yaxes(title_text="Different Methods "+str(y1_name), secondary_y=False)
if(economic_variable):
fig.add_trace(go.Scatter(name=data.columns[-1],x=x, y=data.iloc[:,-1]), secondary_y=True)
fig.update_yaxes(title_text="Economic Indicator in %", secondary_y=True)
fig.update_layout(barmode='relative',showlegend = True, title_text="Line Chart for "+title_name)
fig.show()
def equations(variables,*args) :
(v,sigmav) = variables
(f_lagged,r,e, sigmae) = args
d1 = (np.log(v/f_lagged) +(r+0.5*sigmav**2))/sigmav
d2 = d1-sigmav
eq1 = e - v*si.norm.cdf(d1, 0.0, 1.0) + np.exp(-r)*f_lagged*si.norm.cdf(d2, 0.0, 1.0)
eq2 = sigmae - (v*si.norm.cdf(d1, 0.0, 1.0)*sigmav)/e
return (eq1, eq2)
def equation_v(variables,*args) :
(v) = variables
(f_lagged,r,e,sigmav,time) = args
d1 = (np.log(v/f_lagged) +(r+0.5*sigmav**2)*time)/(sigmav*np.sqrt(time))
d2 = d1-sigmav*np.sqrt(time)
eq1 = e - v*si.norm.cdf(d1, 0.0, 1.0) + np.exp(-r*time)*f_lagged*si.norm.cdf(d2, 0.0, 1.0)
return (eq1)
def naive_method(df,lin_fac,mul_fac):
sigma_d = df["sigmae"]*mul_fac + lin_fac
df["v"+"_"+str(lin_fac)+"_"+str(mul_fac)] = df["e"] + df["f_lagged"]
df["sigmav"+"_"+str(lin_fac)+"_"+str(mul_fac)] = (df["e"]*df["sigmae"]/df["v"+"_"+str(lin_fac)+"_"+str(mul_fac)]) + (df["f_lagged"]*sigma_d/df["v"+"_"+str(lin_fac)+"_"+str(mul_fac)])
distance_default(df,"dd_naive"+"_"+str(lin_fac)+"_"+str(mul_fac),"v"+"_"+str(lin_fac)+"_"+str(mul_fac),"sigmav"+"_"+str(lin_fac)+"_"+str(mul_fac))
def solver_method(df):
df["v_solver"] = 0
df["sigmav_solver"] = 0
df["dd_solver"] = 0
for i in range(len(df)):
if(df.loc[i,:].isnull().values.any()):
continue
f_lagged = df.loc[i,"f_lagged"]
r = df.loc[i,"r"]
e = df.loc[i,"e"]
sigmae = df.loc[i,"sigmae"]
initial = (f_lagged,r, e,sigmae)
df.loc[i,["v_solver","sigmav_solver"]] = fsolve(equations,((e+f_lagged),0.10),args = initial)
distance_default(df,"dd_solver","v_solver","sigmav_solver")
##Reading the interest rate data
daily_fed = pd.read_csv('/Users/global_minima/Downloads/DTB3.csv')
daily_fed.columns = daily_fed.columns.str.strip().str.lower()
daily_fed["date"] = pd.to_datetime(daily_fed["date"])
daily_fed["dtb3"] = pd.to_numeric(daily_fed["dtb3"], errors='coerce')
daily_fed = daily_fed.dropna(subset = ["date","dtb3"]).reset_index(drop = True)
daily_fed["fyear"] = daily_fed["date"].dt.year
daily_fed["r"] = np.log(1+daily_fed["dtb3"]/100.0)
int_rates = daily_fed.groupby(by = "fyear")["r"].first()
use_cols = ["cusip","fyear","dlc","dltt","indfmt","datafmt","popsrc","fic","consol"]
funda_data= pd.read_csv("/Users/global_minima/funda.csv",usecols=use_cols)
funda_data_filtered = funda_data[(funda_data["indfmt"]=="INDL")&(funda_data["datafmt"]=="STD")&
(funda_data["popsrc"]=="D")&(funda_data["fic"]=="USA")&
(funda_data["consol"]=="C")&(funda_data["fyear"]>=1970)&
(funda_data["fyear"]<=2020)]
del funda_data
funda_data_filtered['cusip'] = funda_data_filtered['cusip'].str[0:6]
funda_data_filtered["date"] = pd.to_datetime(funda_data_filtered['fyear'],format = "%Y")
funda_data_filtered['fyear'] = funda_data_filtered["date"].dt.year
funda_data_filtered['dlc'] = funda_data_filtered['dlc']*1000000
funda_data_filtered['dltt'] = funda_data_filtered['dltt']*1000000
funda_data_filtered = funda_data_filtered.sort_values("fyear").reset_index(drop = True)
funda_data_filtered["f"] = funda_data_filtered["dlc"] + funda_data_filtered["dltt"]*0.5
funda_data_filtered["f_lagged"] = funda_data_filtered.groupby(by = ["cusip"])["f"].shift()
print(funda_data_filtered.head(10))
daily_data = pd.read_csv('/Users/global_minima/Downloads/daily_crsp.csv',dtype = {"CUSIP":"object"})
daily_data.columns = daily_data.columns.str.strip().str.lower()
daily_data = daily_data[daily_data["date"]>=19700101]
daily_data["date"] = pd.to_datetime(daily_data["date"],format = "%Y%m%d")
daily_data["fyear"] = daily_data["date"].dt.year
daily_data = daily_data.sort_values("date").reset_index(drop = True)
daily_data["cusip"] = daily_data['cusip'].str[0:6]
daily_data["e"] = abs(daily_data["prc"]*daily_data["shrout"]*1000)
daily_data = daily_data.dropna(subset=["prc"])
daily_data = daily_data.dropna(subset=["ret"])
daily_data["ret"] = pd.to_numeric(daily_data["ret"], errors='coerce')
print(daily_data.head(10))
annret = pd.DataFrame(daily_data.groupby(by=['cusip','fyear']).apply(lambda x:
np.exp(np.sum(np.log(1+x['ret'])))))
annret = annret.rename(columns = {0:"cum_ret"})
annret = pd.DataFrame(annret.groupby(by=['cusip'])["cum_ret"].shift())
sigmae = pd.DataFrame(daily_data.groupby(by=['cusip','fyear'])['ret'].std()*np.sqrt(250))
sigmae = sigmae.rename(columns = {"ret":"sigmae"})
sigmae = pd.DataFrame(sigmae.groupby(by = ["cusip"])["sigmae"].shift())
equity_value = pd.DataFrame(daily_data.groupby(by=['cusip','fyear'])['e'].first())
final = pd.merge(annret,sigmae, on = ["cusip","fyear"],how = "inner")
final = final.merge(funda_data_filtered[["cusip","fyear","f_lagged"]],on = ["cusip","fyear"],how = "inner")
final = final.merge(int_rates,on = "fyear",how = "inner")
final = final.merge(equity_value,on = ["cusip","fyear"],how = "inner")
final.sort_values("fyear").reset_index(drop = True, inplace=True)
print(final.tail(10))
print("Sampling the Data of 1000 firms from the original dataframe")
number_of_firms_sample = 1000
sub_sample = pd.DataFrame(final.groupby('fyear').apply(lambda d: d.sample(number_of_firms_sample))).reset_index(drop = True)
sub_sample = sub_sample.dropna().reset_index(drop = True)
#Method 1 (Naive)
naive_method(sub_sample,0.05,0.25)
naive_method(sub_sample,0.05,0.5)
naive_method(sub_sample,0,0.25)
probability_default(sub_sample,"pd_naive_0.05_0.25","dd_naive_0.05_0.25")
probability_default(sub_sample,"pd_naive_0.05_0.5","dd_naive_0.05_0.5")
probability_default(sub_sample,"pd_naive_0_0.25","dd_naive_0_0.25")
#Method 2 (Unknown find out sigmav and v)
solver_method(sub_sample)
probability_default(sub_sample,"pd_solver","dd_solver")
print("Completed the DD and PD for the Method 1 and 2")
## Method 3 solver
def iterative_method(df):
i = 0
df["v_iterate"] = 0
df["sigmav_iterate"] = 0
while (i <len(df)):
if(df.loc[i,:].isnull().values.any()):
i += 1
continue
start_year = df.loc[i,"fyear"]
cusip_id = df.loc[i,"cusip"]
r = df.loc[i,"r"]
e = df.loc[i,"e"]
error = 1000000
time_steps = len(df[(df["fyear"]==start_year)&(df["cusip"]==cusip_id)])
try:
f_lagged = final_data.loc[(final_data["fyear"]==(start_year+1))&(final_data["cusip"]==cusip_id),"f_lagged"].iloc[0]
sigmav = df.loc[(df["fyear"]==(start_year+1))&(df["cusip"]==cusip_id),"sigmav_solver"].iloc[0]
except:
i += time_steps
continue
if(sigmav == 0 or np.isnan(sigmav) or f_lagged== 0 or np.isnan(f_lagged)):
i += time_steps
continue
tr_df = final_data.loc[(final_data["fyear"]==(start_year))&(final_data["cusip"]==cusip_id)].reset_index(drop = True)
while(error > 0.01):
value_company = []
for j in range(time_steps):
e = tr_df.loc[j,"e"]
initial = (f_lagged,r,e,sigmav,1)
value_company.append(fsolve(equation_v,(e+f_lagged),args = initial)[0])
sigmav_n = (np.std(pd.DataFrame(value_company).pct_change())*np.sqrt(12)).iloc[0]
error = abs(sigmav_n - sigmav)
sigmav = sigmav_n
initial = (f_lagged,r, e,sigmav,1)
df.loc[(i+time_steps),["v_iterate","sigmav_iterate"]] = (fsolve(equation_v,(e+f_lagged),args = initial),sigmav_n)
i += time_steps
return df
print("Started Resampling the data to monthly level for the daily CRSP Data")
monthly_data = daily_data.groupby("cusip").resample('M', on='date').first().reset_index(drop =True)
cols_taken = ["cusip","fyear","cum_ret","sigmae","r","dd_naive_0.05_0.25"
,"dd_naive_0.05_0.5","dd_naive_0_0.25","pd_naive_0.05_0.25","pd_naive_0.05_0.5",
"pd_naive_0_0.25","dd_solver","pd_solver","sigmav_solver"]
monthly_data = monthly_data.merge(funda_data_filtered[["cusip","fyear","f_lagged"]],on = ["cusip","fyear"],how = "left")
method_3_data = pd.merge(monthly_data[["date","fyear","cusip","e","f_lagged"]],sub_sample[cols_taken],on = ["cusip","fyear"],how = "inner")
method_3_data.sort_values("date").reset_index(drop = True, inplace=True)
method_3_data = method_3_data[method_3_data.replace([np.inf, -np.inf], np.nan).notnull().all(axis=1)]
k = pd.DataFrame(sub_sample.groupby("fyear")["cusip"].unique()).reset_index()
print("Completed the resampling method")
sub_sample_1 = pd.DataFrame()
for i in range(len(k)):
df1 = method_3_data[method_3_data["cusip"].isin(np.random.choice(k.loc[i,"cusip"],250)) & (method_3_data["fyear"]==k.loc[i,"fyear"])]
sub_sample_1 = sub_sample_1.append(df1)
sub_sample_1 = sub_sample_1.loc[:,sub_sample_1.columns!="f_lagged"].reset_index(drop = True)
monthly_data = monthly_data.dropna().reset_index(drop = True)
final_data = pd.merge(sub_sample_1[sub_sample_1.columns[~sub_sample_1.columns.isin(["fyear","e","f_lagged"])]],monthly_data[["date","fyear","cusip","e","f_lagged"]],on = ["date","cusip"],how= "right")
final_data = final_data.sort_values(by = ["cusip","fyear"],ascending = [True, True]).reset_index(drop= True)
sub_sample_1 = sub_sample_1.sort_values(by = ["cusip","fyear"],ascending = [True, True]).reset_index(drop= True)
iterative_method(sub_sample_1)
df2 = sub_sample_1[sub_sample_1.replace([np.inf, -np.inf], np.nan).notnull().all(axis=1)]
df2 = df2.merge(final_data[["cusip","date","f_lagged"]],on = ["cusip","date"],how = "left")
distance_default(df2,"dd_iterate","v_iterate","sigmav_iterate")
probability_default(df2,"pd_iterate","dd_iterate")
df2 = df2[(df2["pd_iterate"]<0.75) & (df2["dd_iterate"]<40)].reset_index(drop = True)
print(df2.head(10))
print("Completed the Method 3 calculations")
print("Started Descriptive Stats for the Methods")
desc_stats_columns = ["dd_naive_0.05_0.25","dd_naive_0.05_0.5","dd_naive_0_0.25","pd_naive_0.05_0.25","pd_naive_0.05_0.5",
"pd_naive_0_0.25","dd_solver","pd_solver","dd_iterate","pd_iterate"]
descriptive_stats = df2[desc_stats_columns].describe()
print(descriptive_stats)
##Correlation among all the methods
columns_DD = [x for x in desc_stats_columns if x.startswith('dd_')]
correlation_table_DD = df2[columns_DD].corr()
columns_PD = [x for x in desc_stats_columns if x.startswith('pd_')]
correlation_table_PD = df2[columns_PD].corr()
print("Correlation between Methods for Distance to Default is: \n",correlation_table_DD)
print("Correlation between Methods for Probability of Default is: \n",correlation_table_PD)
descriptive_stats_across_time = df2[["fyear"]+desc_stats_columns].groupby("fyear").describe()
descriptive_stats_across_time.columns = descriptive_stats_across_time.columns.map('{0[0]}_{0[1]}'.format)
descriptive_stats_across_time = descriptive_stats_across_time.reset_index()
plot_chart(descriptive_stats_across_time[["fyear"]+[s + "_mean" for s in columns_DD]],"Mean of Distance to Default (DD) Across Time","DD in Years")
plot_chart(descriptive_stats_across_time[["fyear"]+[s + "_mean" for s in columns_PD]],"Mean of Probability of (PD) Across Time","Probability of Default")
plot_chart(descriptive_stats_across_time[["fyear"]+[s + "_25%" for s in columns_DD]],"25%ile of Distance to Default (DD) Across Time","DD in Years")
plot_chart(descriptive_stats_across_time[["fyear"]+[s + "_25%" for s in columns_PD]],"25%ile of Probability of (PD) Across Time","Probability of Default")
plot_chart(descriptive_stats_across_time[["fyear"]+[s + "_50%" for s in columns_DD]],"50%ile of Distance to Default (DD) Across Time","DD in Years")
plot_chart(descriptive_stats_across_time[["fyear"]+[s + "_50%" for s in columns_PD]],"50%ile of Probability of (PD) Across Time","Probability of Default")
plot_chart(descriptive_stats_across_time[["fyear"]+[s + "_75%" for s in columns_DD]],"75%ile of Distance to Default (DD) Across Time","DD in Years")
plot_chart(descriptive_stats_across_time[["fyear"]+[s + "_75%" for s in columns_PD]],"75%ile of Probability of (PD) Across Time","Probability of Default")
print("Completed Initial Descriptive Plots")
##Loading the NBER Data
nber_data = pd.read_csv('/Users/global_minima/Downloads/USREC.csv')
nber_data["DATE"] = pd.to_datetime(nber_data["DATE"])
nber_data = nber_data.rename(columns = {"USREC":"Recession_Flag"})
nber_data["fyear"] = nber_data["DATE"].dt.year
recession_data = nber_data.groupby('fyear')["Recession_Flag"].mean().round().reset_index()
##Loading the BAA-Fed Fund Spread Data
baafa_data = pd.read_csv('/Users/global_minima/Downloads/BAAFFM.csv')
baafa_data["DATE"] = pd.to_datetime(baafa_data["DATE"])
baafa_data = baafa_data.rename(columns = {"BAAFFM":"BAA_Spread_Percentage"})
baafa_data["fyear"] = baafa_data["DATE"].dt.year
baafa_data = baafa_data.groupby('fyear')["BAA_Spread_Percentage"].mean().reset_index()
##Loading the Cleveland Financial Stress Index Data
cfsi_data = pd.read_csv('/Users/global_minima/Downloads/CFSI.csv')
cfsi_data["DATE"] = pd.to_datetime(cfsi_data["DATE"])
cfsi_data = cfsi_data.rename(columns = {"CFSI":"CFSI_Stress_Value"})
cfsi_data["fyear"] = cfsi_data["DATE"].dt.year
cfsi_data = cfsi_data.groupby('fyear')["CFSI_Stress_Value"].mean().reset_index()
print("NBER data is: \n",recession_data.head(10))
print("BAA Spread data is: \n",baafa_data.head(10))
print("Cleveland Financial Stress Index data is: \n",cfsi_data.head(10))
print("Completed reading all the data")
print("Started Plotting Economic Indicators against the DD and PD for 3 Methods calcualted above")
merged_df = pd.merge(df2,recession_data,on = "fyear", how = "inner")
not_recession_df = merged_df[merged_df["Recession_Flag"]==0]
recession_df = merged_df[merged_df["Recession_Flag"]==1]
not_recession_df = not_recession_df[["fyear"]+desc_stats_columns].groupby(["fyear"]).mean().reset_index()
recession_df = recession_df[["fyear"]+desc_stats_columns].groupby(["fyear"]).mean().reset_index()
plot_chart(not_recession_df[["fyear"]+columns_DD],"Mean of Distance to Default (DD) Across Non Recessionary Period","DD in Years")
plot_chart(recession_df[["fyear"]+columns_DD],"Mean of Distance to Default (DD) Across Recessionary Period","DD in Years")
##Plot charts for PD
plot_chart(not_recession_df[["fyear"]+columns_PD],"Mean of Probability of Default (PD) Across Non Recessionary Period","Probability of Default")
plot_chart(recession_df[["fyear"]+columns_PD],"Mean of Probability of Default (PD) Across Recessionary Period","Probability of Default")
merged_baa = pd.merge(df2,baafa_data,on = "fyear", how = "inner")
merged_baa = merged_baa[["fyear","BAA_Spread_Percentage"]+desc_stats_columns].groupby(["fyear"]).mean().reset_index()
plot_chart(merged_baa[["fyear"]+columns_DD+["BAA_Spread_Percentage"]],"Mean of Distance to Default (DD) Across Time VS BAA Spread","DD in Years",True)
plot_chart(merged_baa[["fyear"]+columns_PD+["BAA_Spread_Percentage"]],"Mean of Probability of Default (PD) Across Time VS BAA Spread","Probability of Default",True)
merged_cfsi = pd.merge(df2,cfsi_data,on = "fyear", how = "inner")
merged_cfsi = merged_cfsi[["fyear","CFSI_Stress_Value"]+desc_stats_columns].groupby(["fyear"]).mean().reset_index()
plot_chart(merged_cfsi[["fyear"]+columns_DD+["CFSI_Stress_Value"]],"Mean of Distance to Default (DD) Across Time VS CFSI","DD in Years",True)
plot_chart(merged_cfsi[["fyear"]+columns_PD+["CFSI_Stress_Value"]],"Mean of Probability of Default (PD) Across Time VS CFSI","Probability of Default",True)
print("Code Executed Succesfully!!!!")